Skip to content

fix(envd): discard output when no subscriber is connected#2639

Merged
arkamar merged 2 commits into
mainfrom
fix/envd-discard-output-on-disconnect
May 14, 2026
Merged

fix(envd): discard output when no subscriber is connected#2639
arkamar merged 2 commits into
mainfrom
fix/envd-discard-output-on-disconnect

Conversation

@arkamar

@arkamar arkamar commented May 13, 2026

Copy link
Copy Markdown
Member

When a Start/Connect client disconnects while a process is producing output, the fan-out loop in MultiplexedChannel blocks trying to deliver to the dead subscriber's channel. This back-pressures the reader goroutine, which allocates a fresh 32 KiB buffer on every Read call, causing envd RSS to grow unboundedly.

The fix makes the fan-out skip cancelled subscribers and keep draining Source to void when none remain, so the child process is never back-pressured. The reader goroutines now reuse a single buffer and only allocate+copy when HasSubscribers() returns true.


Alternative solution to #2620

@cursor

cursor Bot commented May 13, 2026

Copy link
Copy Markdown

PR Summary

Medium Risk
Touches concurrent fan-out and process output streaming paths; mistakes here can deadlock output delivery or silently drop stdout/stderr under load.

Overview
This change intentionally drops stdout/stderr events (and their buffered log events) when MultiplexedChannel has no active subscribers, and adds HasSubscribers() to check for non-cancelled consumers before cloning/sending output. This reduces allocations by reusing a read buffer but makes output loss possible if a subscriber disconnects/reconnects, and adds an O(n) subscriber scan on every read. pkg.Version is bumped to 0.5.19.

Reviewed by Cursor Bugbot for commit 1069a1e. Bugbot is set up for automated code reviews on this repo. Configure here.

@codecov

codecov Bot commented May 13, 2026

Copy link
Copy Markdown

❌ 8 Tests Failed:

Tests completed Failed Passed Skipped
2618 8 2610 7
View the full list of 8 ❄️ flaky test(s)
github.com/e2b-dev/infra/tests/integration/internal/tests/api/sandboxes::TestSnapshotTemplateCreateSandbox

Flake rate in main: 54.99% (Passed 176 times, Failed 215 times)

Stack Traces | 0s run time
=== RUN   TestSnapshotTemplateCreateSandbox
=== PAUSE TestSnapshotTemplateCreateSandbox
=== CONT  TestSnapshotTemplateCreateSandbox
--- FAIL: TestSnapshotTemplateCreateSandbox (0.00s)
github.com/e2b-dev/infra/tests/integration/internal/tests/api/sandboxes::TestSnapshotTemplateCreateSandbox/overwritten_snapshot_build_is_served_immediately_on_sandbox_create

Flake rate in main: 54.99% (Passed 176 times, Failed 215 times)

Stack Traces | 31.5s run time
=== RUN   TestSnapshotTemplateCreateSandbox/overwritten_snapshot_build_is_served_immediately_on_sandbox_create
=== PAUSE TestSnapshotTemplateCreateSandbox/overwritten_snapshot_build_is_served_immediately_on_sandbox_create
=== CONT  TestSnapshotTemplateCreateSandbox/overwritten_snapshot_build_is_served_immediately_on_sandbox_create
    snapshot_template_test.go:294: Command [/bin/sh] output: event:{start:{pid:1251}}
    snapshot_template_test.go:294: Command [/bin/sh] output: event:{end:{exited:true  status:"exit status 0"}}
    snapshot_template_test.go:294: Command [/bin/sh] completed successfully in sandbox ihmniz1urdah6uvrec44x
    snapshot_template_test.go:299: 
        	Error Trace:	.../api/sandboxes/snapshot_template_test.go:299
        	Error:      	Not equal: 
        	            	expected: 201
        	            	actual  : 409
        	Test:       	TestSnapshotTemplateCreateSandbox/overwritten_snapshot_build_is_served_immediately_on_sandbox_create
--- FAIL: TestSnapshotTemplateCreateSandbox/overwritten_snapshot_build_is_served_immediately_on_sandbox_create (31.53s)
github.com/e2b-dev/infra/tests/integration/internal/tests/api/sandboxes::TestUpdateNetworkConfig

Flake rate in main: 76.91% (Passed 181 times, Failed 603 times)

Stack Traces | 44s run time
=== RUN   TestUpdateNetworkConfig
=== PAUSE TestUpdateNetworkConfig
=== CONT  TestUpdateNetworkConfig
Executing command curl in sandbox inzzttiu55hqcwqg93is0
--- FAIL: TestUpdateNetworkConfig (43.97s)
github.com/e2b-dev/infra/tests/integration/internal/tests/api/sandboxes::TestUpdateNetworkConfig/pause_resume_preserves_allow_internet_access_false

Flake rate in main: 77.33% (Passed 175 times, Failed 597 times)

Stack Traces | 7.95s run time
=== RUN   TestUpdateNetworkConfig/pause_resume_preserves_allow_internet_access_false
Executing command curl in sandbox it7p8h4b49l9q4byqk1j9
    sandbox_network_update_test.go:372: Command [curl] output: event:{start:{pid:1355}}
    sandbox_network_update_test.go:372: Command [curl] output: event:{end:{exit_code:35  exited:true  status:"exit status 35"  error:"exit status 35"}}
Executing command curl in sandbox it7p8h4b49l9q4byqk1j9
    sandbox_network_update_test.go:372: Command [curl] output: event:{start:{pid:1356}}
    sandbox_network_update_test.go:372: Command [curl] output: event:{end:{exit_code:35  exited:true  status:"exit status 35"  error:"exit status 35"}}
    sandbox_network_update_test.go:391: Command [curl] output: event:{start:{pid:1357}}
    sandbox_network_update_test.go:391: Command [curl] output: event:{data:{stdout:"HTTP/2 302 \r\nx-content-type-options: nosniff\r\nlocation: https://dns.google/\r\ndate: Thu, 14 May 2026 10:53:21 GMT\r\ncontent-type: text/html; charset=UTF-8\r\nserver: HTTP server (unknown)\r\ncontent-length: 216\r\nx-xss-protection: 0\r\nx-frame-options: SAMEORIGIN\r\nalt-svc: h3=\":443\"; ma=2592000,h3-29=\":443\"; ma=2592000\r\n\r\n"}}
    sandbox_network_update_test.go:391: Command [curl] output: event:{end:{exited:true  status:"exit status 0"}}
    sandbox_network_update_test.go:391: Command [curl] completed successfully in sandbox it7p8h4b49l9q4byqk1j9
    sandbox_network_update_test.go:391: 
        	Error Trace:	.../api/sandboxes/sandbox_network_out_test.go:74
        	            				.../api/sandboxes/sandbox_network_update_test.go:60
        	            				.../api/sandboxes/sandbox_network_update_test.go:391
        	Error:      	An error is expected but got nil.
        	Test:       	TestUpdateNetworkConfig/pause_resume_preserves_allow_internet_access_false
        	Messages:   	https://8.8.8.8 should be blocked
--- FAIL: TestUpdateNetworkConfig/pause_resume_preserves_allow_internet_access_false (7.95s)
github.com/e2b-dev/infra/tests/integration/internal/tests/orchestrator::TestSandboxMemoryIntegrity

Flake rate in main: 66.42% (Passed 182 times, Failed 360 times)

Stack Traces | 84.8s run time
=== RUN   TestSandboxMemoryIntegrity
=== PAUSE TestSandboxMemoryIntegrity
=== CONT  TestSandboxMemoryIntegrity
    sandbox_memory_integrity_test.go:26: Build completed successfully
--- FAIL: TestSandboxMemoryIntegrity (84.83s)
github.com/e2b-dev/infra/tests/integration/internal/tests/orchestrator::TestSandboxMemoryIntegrity/tmpfs_hash

Flake rate in main: 67.30% (Passed 172 times, Failed 354 times)

Stack Traces | 36.7s run time
=== RUN   TestSandboxMemoryIntegrity/tmpfs_hash
=== PAUSE TestSandboxMemoryIntegrity/tmpfs_hash
=== CONT  TestSandboxMemoryIntegrity/tmpfs_hash
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{start:{pid:1258}}
Executing command bash in sandbox i9kbadb6qachbtfailt96 (user: root)
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stdout:"Total memory: 985 MB\nUsed memory before tmpfs mount: 187 MB\nFree memory before tmpfs mount: 797 MB\nMemory to use in integrity test (80% of free, min 64MB): 637 MB\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"637+0 records in\n637+0 records out\n667942912 bytes (668 MB, 637 MiB) copied, 11.5574 s, 57.8 MB/s\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\t"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"C"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"o"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"m"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"m"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"a"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"d"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"b"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"e"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"i"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"g"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"t"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"i"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"m"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"e"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"d: \"dd if=/dev/urandom of=/mnt/testfile bs=1M count=637\"\n\tUser time (seconds): 0.00\n\tSystem time (seconds): 11.28\n\tPercent of CPU this job got: 97%\n\tElapsed (wall clock) time (h:mm:ss or m"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:":"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"s"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"s"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:")"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:":"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"0:11.56"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\t"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"A"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"v"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"e"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"r"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"a"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"g"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"e"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"s"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"h"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"a"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"r"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"e"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"d"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"t"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"e"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"x"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"t"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"s"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"i"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"z"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"e"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"("}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"k"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"b"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"y"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"t"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"e"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"s"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:")"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:":"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"0"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\tAverage unshared data size (kbytes): 0\n\tAverage stack size (kbytes): 0\n\tAverage total size (kbytes): 0\n\tMaximum resident set size (kbytes): 2704\n\tAverage resident set size (kbytes): 0\n\tMajor (requiring I/O) page faults: 2\n\tMinor (reclaiming a frame) page faults: 346\n\tVoluntary context switches: 3\n\tInvoluntary context switches: 113\n\tSwaps: 0\n\tFile system inputs: 176\n\tFile system outputs: 0\n\tSocket messages sent: 0\n\tSocket messages received: 0\n\tSignals delivered: 0\n\tPage size (bytes): 4096\n\tExit status: 0\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stdout:"Used memory after tmpfs mount and file fill: 834 MB\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{end:{exited:true status:"exit status 0"}}
    sandbox_memory_integrity_test.go:70: Command [bash] completed successfully in sandbox i9y70fe1qp2daxpsog44h
Executing command bash in sandbox i9y70fe1qp2daxpsog44h (user: root)
    sandbox_memory_integrity_test.go:74: Command [bash] output: event:{start:{pid:1274}}
    sandbox_memory_integrity_test.go:74: Command [bash] output: event:{data:{stdout:"63f1e3961c730ba210e410b34e592f1007526874f9fb5f346f4d1ce327b1e4ef\n"}}
    sandbox_memory_integrity_test.go:74: Command [bash] output: event:{end:{exited:true status:"exit status 0"}}
    sandbox_memory_integrity_test.go:74: Command [bash] completed successfully in sandbox i9y70fe1qp2daxpsog44h
Executing command bash in sandbox i9y70fe1qp2daxpsog44h (user: root)
    sandbox_memory_integrity_test.go:99: Command [bash] output: event:{start:{pid:1277}}
    sandbox_memory_integrity_test.go:100: 
        	Error Trace:	.../tests/orchestrator/sandbox_memory_integrity_test.go:100
        	Error:      	Received unexpected error:
        	            	failed to execute command bash in sandbox i9y70fe1qp2daxpsog44h: invalid_argument: protocol error: incomplete envelope: unexpected EOF
        	Test:       	TestSandboxMemoryIntegrity/tmpfs_hash
--- FAIL: TestSandboxMemoryIntegrity/tmpfs_hash (36.71s)
github.com/e2b-dev/infra/tests/integration/internal/tests/proxies::TestSandboxAutoResumeViaProxy

Flake rate in main: 55.33% (Passed 176 times, Failed 218 times)

Stack Traces | 20.9s run time
=== RUN   TestSandboxAutoResumeViaProxy
=== PAUSE TestSandboxAutoResumeViaProxy
=== CONT  TestSandboxAutoResumeViaProxy
    auto_resume_test.go:97: [Status code: 502] Response body: {"sandboxId":"i99cowqyb8mgvza9nj0dv","message":"The sandbox is running but port is not open","port":8000,"code":502}
    auto_resume_test.go:97: [Status code: 502] Response body: {"sandboxId":"i99cowqyb8mgvza9nj0dv","message":"The sandbox is running but port is not open","port":8000,"code":502}
Executing command ls in sandbox i4z6yq57q1kx71gdlfxsv
    auto_resume_test.go:97: [Status code: 502] Response body: {"sandboxId":"i99cowqyb8mgvza9nj0dv","message":"The sandbox is running but port is not open","port":8000,"code":502}
    auto_resume_test.go:116: 
        	Error Trace:	.../tests/proxies/auto_resume_test.go:116
        	Error:      	Received unexpected error:
        	            	Get "http://localhost:3002": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
        	Test:       	TestSandboxAutoResumeViaProxy
--- FAIL: TestSandboxAutoResumeViaProxy (20.94s)
Executing command apt-get in sandbox iu6gfhc7c3do67tzls1y5 (user: root)
github.com/e2b-dev/infra/tests/integration/internal/tests/proxies::TestSandboxWithTrafficAccessTokenAutoResumeViaProxy

Flake rate in main: 55.33% (Passed 176 times, Failed 218 times)

Stack Traces | 19.3s run time
=== RUN   TestSandboxWithTrafficAccessTokenAutoResumeViaProxy
=== PAUSE TestSandboxWithTrafficAccessTokenAutoResumeViaProxy
=== CONT  TestSandboxWithTrafficAccessTokenAutoResumeViaProxy
    traffic_access_token_test.go:263: [Status code: 502] Response body: {"sandboxId":"iwo0gfyr56wjvqk4g891y","message":"The sandbox is running but port is not open","port":8080,"code":502}
Executing command ls in sandbox is2hl6xkfznjhkuvdd6o0
    traffic_access_token_test.go:263: [Status code: 502] Response body: {"sandboxId":"iwo0gfyr56wjvqk4g891y","message":"The sandbox is running but port is not open","port":8080,"code":502}
Executing command ls in sandbox ir2iabd14urpnweftqhkz
    traffic_access_token_test.go:263: [Status code: 502] Response body: {"sandboxId":"iwo0gfyr56wjvqk4g891y","message":"The sandbox is running but port is not open","port":8080,"code":502}
    traffic_access_token_test.go:292: 
        	Error Trace:	.../tests/proxies/traffic_access_token_test.go:292
        	Error:      	Received unexpected error:
        	            	Get "http://localhost:3002": context deadline exceeded (Client.Timeout exceeded while awaiting headers)
        	Test:       	TestSandboxWithTrafficAccessTokenAutoResumeViaProxy
--- FAIL: TestSandboxWithTrafficAccessTokenAutoResumeViaProxy (19.33s)

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1440f94c7c

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread packages/envd/internal/services/process/start_test.go Outdated
@linear-code

linear-code Bot commented May 13, 2026

Copy link
Copy Markdown

ENG-3933

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

Iterating over the m.channels slice header outside the lock in the fan-out loop is unsafe and can lead to race conditions where subscribers are skipped or receive duplicate values. The HasSubscribers() check in the process handler prevents stdout and stderr output from being sent to the system logs when no RPC client is connected, which results in a loss of observability for background processes.

Comment thread packages/envd/internal/services/process/handler/multiplex.go
Comment thread packages/envd/internal/services/process/handler/handler.go
Comment thread packages/envd/internal/services/process/handler/handler.go

@cursor cursor Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 1440f94c7c7d1c3e4ed1a0ddd260c41b12c286b4. Configure here.

Comment thread packages/envd/internal/services/process/handler/handler.go
@arkamar arkamar force-pushed the fix/envd-discard-output-on-disconnect branch from 1440f94 to 47ee515 Compare May 13, 2026 13:41
@arkamar arkamar marked this pull request as draft May 13, 2026 13:44

@claude claude Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional findings (outside current diff — PR may have been updated during review):

  • 🟣 packages/envd/internal/services/process/handler/multiplex.go:55-67Pre-existing data race in the fan-out loop that this PR leaves intact. run() takes a slice-header snapshot of m.channels under RLock and iterates after unlocking, while remove() concurrently mutates the same backing array in place via append(m.channels[:i], m.channels[i+1:]...). With 3+ subscribers (Start RPC + concurrent Connect RPCs both call DataEvent.Fork()) and a non-tail cancel mid-iteration, the same value can be delivered twice to one client and skipped on another — silent stdout/stderr corruption. One-line fix: deep-copy under RLock (subs := append([]*subscriber[T]{}, m.channels...)).

    Extended reasoning...

    What is wrong

    In multiplex.go the fan-out loop snapshots the subscriber slice header under RLock and then iterates after releasing the lock:

    m.mu.RLock()
    subs := m.channels   // copies header only; shares backing array
    m.mu.RUnlock()
    
    for _, s := range subs {
        if s.isCancelled() { continue }
        select {
        case s.ch <- v:
        case <-s.done:
        }
    }

    Meanwhile remove() (line 159) mutates the same backing array in place under the write Lock:

    m.channels = append(m.channels[:i], m.channels[i+1:]...)

    Because m.channels[:i] has enough capacity (we are shrinking), append does not allocate a new array — it performs an in-place forward memmove. After removing index i from a length-N slice, slots i..N-2 are overwritten with the pointers from i+1..N-1, and the trailing slot N-1 retains its old pointer. This is a well-known Go gotcha (see the SliceTricks wiki note about zeroing the trailing element so the GC can reclaim it).

    Step-by-step proof

    Start state: m.channels = [A, B, C] (cap=3). One Start RPC plus two Connect RPCs gives this exact 3-subscriber shape via proc.DataEvent.Fork() (see start.go:106 and connect.go:30).

    1. run() enters with value v. It snapshots subs := m.channels → header {ptr=base, len=3, cap=3}. RUnlock.
    2. i=0: reads subs[0]=A, enters select { case A.ch <- v: case <-A.done: }. A.ch is unbuffered; the goroutine parks.
    3. Concurrently, A’s owner disconnects → cancel() calls remove(A). remove first calls s.cancel() (closes A.done) — the fan-out send unblocks via the <-s.done branch (A receives nothing). Then remove takes the write Lock and executes append(m.channels[:0], m.channels[1:]...). The in-place memmove writes base[0]=B, then base[1]=C. Result: backing array is [B, C, C]; m.channels header is {ptr=base, len=2}. The snapshot subs still has len=3.
    4. i=1: range subs reads subs[1] from the shared backing array. It is now C. The check s.isCancelled() returns false for C. Send v to C. ✓
    5. i=2: subs[2] is also C (the duplicated trailing pointer). isCancelled() is still false. Send v to C again. ✗

    Net effect: subscriber B misses v entirely; subscriber C receives v twice. For a stdout/stderr stream this is a duplicated chunk in one client and a missing chunk in another — silent corruption of the data stream. (With only 2 subscribers [A, B], removing A leaves [B, B] and B receives v twice — the bug is actually easier to trigger.)

    This is also a textbook data race under the Go memory model: the unsynchronized subs[i] reads in run() race with the Lock-protected backing-array writes in remove(). go test -race would flag it.

    Why existing code does not prevent it

    • The RLock/RUnlock pair only protects the header read; the backing array remains shared.
    • The isCancelled() check inside the loop does not save the duplicated subscriber — C is not cancelled, so the send proceeds the second time.
    • s.cancel() is called before remove() takes the write lock, so the fan-out’s per-subscriber select can unblock via <-s.done while remove() races to shift the backing array — providing the exact timing window.

    Trigger conditions and PR relevance

    The per-subscriber case s.ch <- v: blocks until the gRPC stream.Send completes; on a slow network this is arbitrarily long, so the window between iterations is wide. This PR is pre-existing in the strict sense — the snapshot+in-place-shift pattern was introduced in PR #2579 and the two offending lines are not modified here. However, this PR is centrally about fan-out correctness (it rewrites run(), Fork(), and adds HasSubscribers()), and the new “drain to void after cancel” semantics widen the window: previously a cancelled subscriber would wedge the fan-out at the first blocked send, so a race could only corrupt one value; now iteration continues across many values, so the corruption can repeat per value emitted while two clients remain alive.

    Fix

    One-line change in run() — deep-copy the slice under RLock:

    m.mu.RLock()
    subs := append([]*subscriber[T]{}, m.channels...)
    m.mu.RUnlock()

    Alternatively, change remove() to allocate a new backing array instead of shifting in place (m.channels = append([]*subscriber[T]{}, append(m.channels[:i:i], m.channels[i+1:]...)...)), but the snapshot fix is simpler and avoids the trailing-pointer pitfall everywhere.

Comment thread packages/envd/internal/services/process/handler/handler.go Outdated
arkamar added 2 commits May 14, 2026 12:43
…ing process

When a Start/Connect client disconnects while a process is producing
output, the fan-out loop drained the Source channel with no
subscribers, allocating 32 KiB per read cycle. With a fast producer,
envd RSS grew to hundreds of MiB in seconds, OOM-killing sandbox
processes.

Readers now reuse a single read buffer and only allocate + send when
HasSubscribers is true. The fan-out always consumes from Source and
drops values with no subscribers. The child process is never blocked
so servers and background processes stay responsive during disconnects.
@arkamar arkamar force-pushed the fix/envd-discard-output-on-disconnect branch from 47ee515 to 1069a1e Compare May 14, 2026 10:43
@arkamar arkamar changed the title fix(envd): discard output safely when no subscriber is connected fix(envd): discard output when no subscriber is connected May 14, 2026
@arkamar arkamar marked this pull request as ready for review May 14, 2026 11:44

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 1069a1e0a9

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment thread packages/envd/internal/services/process/handler/handler.go
@arkamar arkamar merged commit 8cf1795 into main May 14, 2026
56 checks passed
@arkamar arkamar deleted the fix/envd-discard-output-on-disconnect branch May 14, 2026 14:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants